package org.zjvis.datascience.service;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.zjvis.datascience.common.constant.DatabaseConstant;
import org.zjvis.datascience.common.constant.DatasetConstant;
import org.zjvis.datascience.common.dto.*;
import org.zjvis.datascience.common.enums.DataTypeEnum;
import org.zjvis.datascience.common.enums.TaskInstanceStatus;
import org.zjvis.datascience.common.enums.TaskTypeEnum;
import org.zjvis.datascience.common.model.stat.ColumnConstant;
import org.zjvis.datascience.common.sql.DataCleanSqlHelper;
import org.zjvis.datascience.common.util.*;
import org.zjvis.datascience.common.util.db.JDBCUtil;
import org.zjvis.datascience.common.vo.TaskInstancePageVO;
import org.zjvis.datascience.service.dataprovider.GPDataProvider;
import org.zjvis.datascience.service.mapper.TaskInstanceMapper;
import org.zjvis.datascience.service.mapper.TaskMapper;

import java.sql.*;
import java.util.*;

/**
 * @description TaskInstance  任务节点实例服务 Service
 * @date 2021-12-29
 */
@Service
public class TaskInstanceService {

    private final static Logger logger = LoggerFactory.getLogger(TaskInstanceService.class);

    @Autowired
    private TaskInstanceMapper taskInstanceMapper;

    @Autowired
    private TaskMapper taskMapper;

    @Autowired
    private GPDataProvider gpDataProvider;


    public TaskInstanceDTO queryById(Long id) {
        return taskInstanceMapper.queryById(id);
    }

    public Long save(TaskInstanceDTO taskInstanceDTO) {
        taskInstanceMapper.save(taskInstanceDTO);
        return taskInstanceDTO.getId();
    }


    public Long update(TaskInstanceDTO taskInstanceDTO) {
        taskInstanceMapper.update(taskInstanceDTO);
        return taskInstanceDTO.getId();
    }

    public List<TaskInstanceDTO> queryBySessionId(Long sessionId) {
        return taskInstanceMapper.queryBySessionId(sessionId);
    }

    public List<TaskInstanceDTO> queryByTaskId(Long taskId) {
        return taskInstanceMapper.queryByTaskId(taskId);
    }

    public String queryStatusBySessionId(Long sessionId) {
        List<TaskInstanceDTO> instances = queryBySessionId(sessionId);
        int okCount = 0;
        for (TaskInstanceDTO instance : instances) {
            String status = instance.getStatus();
            if (TaskInstanceStatus.SUCCESS.toString().equals(status)) {
                ++okCount;
            } else if (TaskInstanceStatus.KILLED.toString().equals(status)) {
                return TaskInstanceStatus.KILLED.toString();
            } else if (TaskInstanceStatus.FAIL.toString().equals(status)) {
                return TaskInstanceStatus.FAIL.toString();
            } else if (TaskInstanceStatus.STOP.toString().equals(status)) {
                return TaskInstanceStatus.STOP.toString();
            } else if (TaskInstanceStatus.CREATE.toString().equals(status) ||
                    TaskInstanceStatus.RUNNING.toString().equals(status)) {
                return TaskInstanceStatus.RUNNING.toString();
            }
        }

        if (okCount == instances.size()) {
            return TaskInstanceStatus.SUCCESS.toString();
        }
        return StringUtils.EMPTY;
    }

    public TaskInstanceDTO queryBySpecTaskInstance(Long sessionId, Long taskId) {
        Map<String, Long> map = new HashMap<>();
        map.put("sessionId", sessionId);
        map.put("taskId", taskId);
        return taskInstanceMapper.queryBySpecTaskInstance(map);
    }

    public TaskInstanceDTO queryLatestInstanceForTask(Long taskId) {
        return taskInstanceMapper.queryLatestInstanceForTask(taskId);
    }

    public List<String> queryByTableName(Long id) {
        TaskInstanceDTO dto = queryById(id);
        return queryByTableName(dto);
    }

    public List<String> queryByTableName(TaskInstanceDTO dto) {
        List<String> tableNames = new ArrayList<>();
        if (dto == null) {
            return tableNames;
        }
        if (!dto.getStatus().equals(TaskInstanceStatus.SUCCESS.name())) {
            return tableNames;
        }
        if (dto.getType().equals(TaskTypeEnum.TASK_TYPE_DATA.getVal()) ||
                dto.getType().equals(TaskTypeEnum.TASK_TYPE_CLEAN.getVal()) ||
                dto.getType().equals(TaskTypeEnum.TASK_TYPE_ALGOPY.getVal()) ||
                dto.getType().equals(TaskTypeEnum.TASK_TYPE_JLAB.getVal())) {
            String dataJson = dto.getDataJson();
            try {
                JSONObject data = JSONObject.parseObject(dataJson);
                JSONArray output = data.getJSONObject("inputInfo").getJSONArray("output");
                for (int i = 0; i < output.size(); ++i) {
                    JSONObject t = output.getJSONObject(i);
                    String tmpName = t.getString("tableName");
                    tmpName = ToolUtil.alignTableName(tmpName, 0L);
                    tableNames.add(tmpName);
                }
            } catch (Exception e) {
                return tableNames;
            }
            return tableNames;
        } else if (dto.getType().equals(TaskTypeEnum.TASK_TYPE_ETL.getVal()) ||
                dto.getType().equals(TaskTypeEnum.TASK_TYPE_ALGO.getVal())) {
            String sqlText = dto.getSqlText();
            if (StringUtils.isNotEmpty(sqlText) &&
                    (sqlText.toLowerCase().startsWith("create view") || sqlText.toLowerCase().startsWith("create table"))) {
                String tableName = sqlText.split(" ")[2];
                tableNames.add(tableName);
            } else if (StringUtils.isNotEmpty(sqlText) && sqlText.toLowerCase().startsWith("select pipeline.sys_func_pivot_table_view_create")) {
                String tableName = sqlText.split(",")[1];
                tableNames.add(tableName.substring(1, tableName.length() - 1));
            } else {
                String logInfo = dto.getLogInfo();
                try {
                    JSONObject jsonObject = JSONObject.parseObject(logInfo);
                    JSONArray jsonArray = jsonObject.getJSONObject("result").getJSONArray("output_params");
                    for (int i = 0; i < jsonArray.size(); ++i) {
                        String tableName = jsonArray.getJSONObject(i).getString("out_table_name");
                        if (StringUtils.isNotEmpty(tableName)) {
                            tableNames.add(tableName);
                        }
                    }
                } catch (Exception e) {
                    return tableNames;
                }
            }
        }
        if (dto.getType().equals(TaskTypeEnum.TASK_TYPE_MODEL.getVal())){
            JSONObject data = JSONObject.parseObject(dto.getDataJson());
            return Lists.newArrayList(data.getString("target"));
        }
        return tableNames;
    }

    public List<Long> queryTaskIdHaveRedundantTable(Integer retainNum, String limitDate) {
        Map<String, Object> map = new HashMap<>();
        map.put("retainNum", retainNum);
        map.put("limitDate", limitDate);
        return taskInstanceMapper.queryTaskIdHaveRedundantTable(map);
    }

    public List<Long> queryRedundantTableInCleanNode(String limitDate) {
        return taskInstanceMapper.queryRedundantTableInCleanNode(limitDate);
    }

    public List<TaskInstanceDTO> queryRedundantTableByTaskId(Long taskId, String limitDate) {
        Map<String, Object> map = new HashMap<>();
        map.put("taskId", taskId);
        map.put("limitDate", limitDate);
        return taskInstanceMapper.queryRedundantTableByTaskId(map);
    }

    /**
     * 删除清洗历史记录
     *
     * @return 待删除的gp视图
     */
    @Transactional(rollbackFor = Exception.class)
    public String deleteActions(Long taskId, Long id) {
        Map map = Maps.newHashMap();
        map.put("taskId", taskId);
        map.put("id", id);
        TaskInstanceDTO taskInstanceDTO = taskInstanceMapper.queryById(id);
        if (null != taskInstanceDTO) {
            String table = JSONObject.parseObject(taskInstanceDTO.getDataJson()).getString("table");
            taskInstanceMapper.deleteActions(map);
            return table;
        }
        return null;
    }

//    public void clearActions(List<TaskInstanceDTO> actions, Long taskId) {
//        if (actions == null || actions.isEmpty()) {
//            return;
//        } else if (actions.size() > 1) {
//            Long id = actions.get(1).getId();//清理第二条生成的表
//            TaskInstanceDTO taskInstanceDTO = taskInstanceMapper.queryById(id);
//            String table = JSONObject.parseObject(taskInstanceDTO.getDataJson()).getString("table");
//            if (!table.split("\\.")[0].equals(GREEN_PLUM_DEFAULT_SCHEMA)) {
//                gpDataProvider.dropRedundantTables(table, true);
//            }
//        }
//        this.deleteActions(taskId, actions.get(0).getId());
//    }

    public void delete(Long id) {
        Map<String, Object> params = Maps.newHashMap();
        params.put("ids", new Long[]{id});
        taskInstanceMapper.delete(params);
    }

    public List<TaskInstanceDTO> queryByTaskIdAndOrder(Long taskId) {
        return taskInstanceMapper.queryByTaskIdAndOrder(taskId);
    }

    public List<TaskInstanceDTO> querySuccessInstanceByTaskId(Long taskId) {
        return taskInstanceMapper.querySuccessInstanceByTaskId(taskId);
    }

    public List<Long> queryIdByTaskId(List<Long> taskIds) {
        return taskInstanceMapper.queryIdByTaskId(taskIds);
    }

    public String getInstanceTableName(Long id) {
        TaskInstanceDTO taskInstance = queryLatestInstanceForTask(id);
        if (taskInstance == null) {
            return null;
        }
//        Long taskInstanceId = taskInstance.getId();
        List<String> outputTables = queryByTableName(taskInstance);
        if (CollectionUtil.isEmpty(outputTables)) return null;
        return outputTables.get(0);
    }

    public TaskInstanceDTO queryInstanceByTableName(String tableName, Long taskId) {
        List<TaskInstanceDTO> taskInstances = queryByTaskId(taskId);
        for (TaskInstanceDTO taskInstance : taskInstances) {
            JSONObject dataJson = JSONObject.parseObject(taskInstance.getDataJson());
            String instanceTableName = ToolUtil.alignTableName(dataJson.getJSONObject("inputInfo").getJSONArray("output")
                    .getJSONObject(0).getString("tableName"), 0L);
            if (instanceTableName.equals(tableName)) {
                return taskInstance;
            }
        }
        return null;
    }

    public List<String> getTableName(JSONArray input, List<String> parentIds) {
        List<String> names = new ArrayList<>();
        for (int i = 0; i < input.size(); i++) {
            String tableName = "dataset." + input.getJSONObject(0).getString("tableName");
            String instanceTableName = getInstanceTableName(Long.parseLong(parentIds.get(i)));
            if (instanceTableName != null) {
                tableName = instanceTableName;
            }
            names.add(tableName);
        }
        return names;
    }

    public List<String> queryAlgoRecentLog(Long taskId, Long projectId) {
        return taskInstanceMapper.queryAlgoRecentLogs(taskId, projectId);
    }

    public Object queryTaskInstancePageInfo(TaskInstancePageVO vo) {
        String sortColumn = vo.getSortColumn();
        if (StringUtils.isBlank(sortColumn)) {
            sortColumn = "id";
        } else {
            //TODO runningTime与另外两个字段不一样，现在是没有runningTime记录，另外两个字段不可排序
            switch (vo.getSortColumn()) {
                case "runningTime":
                case "typeName":
                case "sourceDataset":
                    sortColumn = "id";
                    break;
                default:
                    break;
            }
        }

        if (StringUtils.isBlank(vo.getSortMethod())) {
            vo.setSortMethod("desc");
        }

        String orderBy = sortColumn + " " + vo.getSortMethod();

        PageHelper.startPage(vo.getCurPage(), vo.getPageSize(), orderBy);
        List<TaskInstanceInfoDTO> res = new ArrayList<>();
        List<TaskInstanceQueryDTO> tiqds = taskInstanceMapper.queryTaskInstanceInfo(
                DozerUtil.mapper(vo, TaskInstancePageQueryDTO.class));

        if (tiqds.isEmpty()) {
            //return new PageInfo<>(res);
            return PageUtils.getPageResult(PageInfo.of(tiqds));
        }
        for (TaskInstanceQueryDTO tiqd : tiqds) {
            if (StringUtils.isBlank(tiqd.getParentId())) {
                res.add(tiqd.toInfo(queryTaskInstanceSourceDataset(tiqd.getId(), true)));
            } else {
                res.add(tiqd.toInfo(queryTaskInstanceSourceDataset(tiqd.getId(), false)));
            }
        }
        Page page = PageUtils.getPageResult(PageInfo.of(tiqds));
        page.setData(res);
        return page;
    }

    public void queryParentTaskInstanceId(Long id, Set<Long> pids) {
        List<String> ids = taskInstanceMapper.queryParentTaskInstanceId(id);
        if (ids == null || ids.isEmpty()) {
            return;
        } else {
            for (String tid : ids) {
                if (tid.split(",").length > 1) {
                    String[] newIds = tid.split(",");
                    for (String newId : newIds) {
                        queryParentTaskInstanceId(Long.valueOf(newId), pids);
                    }
                } else {
                    pids.add(Long.valueOf(tid));
                }
            }
        }
    }

    public List<String> queryTaskInstanceSourceDataset(Long id, boolean isSource) {
        List<String> ids = new ArrayList<>();
        Set<Long> pids = new HashSet<>();
        if (isSource) {
            pids.add(id);
        } else {
            queryParentTaskInstanceId(id, pids);
            if (pids.isEmpty()) {
                return ids;
            }
        }
        return taskInstanceMapper.queryTaskInstanceSourceDatasetName(pids);
    }

    /**
     * 更新instance的状态还有对应task的状态
     *
     * @param instance
     * @param status
     */
    public void updateStatus(TaskInstanceDTO instance, TaskInstanceStatus status, String logInfo) {
        //更新状态
        instance.setStatus(status.toString());
        if (null != logInfo && !logInfo.isEmpty()) {
            String oldLogInfo = instance.getLogInfo();
            try {
                JSONObject oldLogInfoObj = JSONObject.parseObject(oldLogInfo);
                oldLogInfoObj.put("status", 500);
                oldLogInfoObj.put("success", logInfo);
                oldLogInfoObj.put("error_msg", logInfo);
                oldLogInfoObj.put("result", new JSONObject());
                instance.setLogInfo(oldLogInfoObj.toJSONString());
            } catch (Exception e) {
                JSONObject newLogObj = new JSONObject();
                newLogObj.put("status", 500);
                newLogObj.put("success", logInfo);
                newLogObj.put("error_msg", logInfo);
                instance.setLogInfo(newLogObj.toJSONString());
            }
        } else {
            instance.setLogInfo(String.format("[{}] task stopped by user", instance.getTaskName()));
        }
        taskInstanceMapper.update(instance);

        //更新Task datajson
        TaskDTO taskDTO = taskMapper.queryById(instance.getTaskId());
        JSONObject oldDataJson = JSONObject.parseObject(taskDTO.getDataJson());
        oldDataJson.put("isSample", status.toString());
        oldDataJson.put("lastStatus", status.toString());
        taskDTO.setDataJson(oldDataJson.toJSONString());
        taskMapper.update(taskDTO);
    }

    public void deleteHistoryActions(Long taskId) {
        taskInstanceMapper.deleteHistoryActions(taskId);
    }

    /**
     * 查询指定taskInstance的分页数据
     *
     * @param instanceId
     * @return
     */
    public JSONObject queryDataById(Long instanceId, Integer curPage, Integer pageSize) {
        TaskInstanceDTO taskInstance = taskInstanceMapper.queryById(instanceId);
        JSONObject dataJsonObj = JSONObject.parseObject(taskInstance.getDataJson());
        String tableName = dataJsonObj.getString("table");
        Page<JSONArray> page = new Page<>();
        int offset = (curPage - 1) * pageSize;
        page.setPageSize(pageSize);
        page.setCurPage(curPage);
        String filter = StringUtils.EMPTY;
        String orderBy = SqlUtil.formatPGSqlColName(DataCleanSqlHelper.IDCOLUMN_NAME) + " ASC";
        JSONObject result = new JSONObject();
        Connection conn = null;
        Integer recordsAmount = 0;
        try {
            //查询数据的总条数
            conn = gpDataProvider.getConn(1L);
            Statement st = conn.createStatement();
            String sql = String.format(DatabaseConstant.GP_COUNT_SQL, tableName);
            ResultSet rs = st.executeQuery(sql);
            if (rs.next()) {
                recordsAmount = rs.getInt(1);
                page.setTotalElementsAndPage(recordsAmount);
            }

            sql = String.format("SELECT * FROM %s %s ORDER BY %s LIMIT %s OFFSET %s",
                    tableName, filter, orderBy, pageSize, offset);

            //真实数据查询
            rs = st.executeQuery(sql);
            ResultSetMetaData meta = rs.getMetaData();
            int colCount = meta.getColumnCount();

            //生成结果的head结构
            JSONArray heads = new JSONArray();
            List<String> colNames = new ArrayList<>();

            //若为清洗节点从taskInstance取元数据，否则从task取元数据，若出错则从数据库取元数据
            try {
                JSONObject info = dataJsonObj.getJSONArray("output").getJSONObject(0);

                JSONObject semantics = info.getJSONObject("semantic");
                JSONArray tableCols = info.getJSONArray("tableCols");
                JSONArray columnTypes = info.getJSONArray("columnTypes");

                JSONObject types = new JSONObject();
                for (int i = 0; i < tableCols.size(); i++) {
                    types.put(tableCols.getString(i), columnTypes.getString(i));
                }
                for (int i = 1; i < colCount + 1; i++) {
                    if (DatasetConstant.DEFAULT_ID_FIELD.equals(meta.getColumnName(i))) {
                        colNames.add(meta.getColumnName(i));
                        continue;
                    }
                    if (meta.getColumnName(i).contains(ColumnConstant.ROW_NUM)) {
                        continue;
                    }
                    JSONObject head = new JSONObject();
                    String columnName = meta.getColumnName(i);
                    head.put("name", columnName);
                    if (types.containsKey(columnName)) {
                        String typeFromJson = types.getString(columnName).toLowerCase();
                        if (DataTypeEnum.DATE.getValue().equals(typeFromJson)) {
                            head.put("desc", typeFromJson);
                            head.put("type", Types.DATE);
                        } else if (DataTypeEnum.JSON.getValue().equals(typeFromJson)) {
                            head.put("desc", typeFromJson);
                            head.put("type", Types.OTHER);
                        } else if (DataTypeEnum.ARRAY.getValue().equals(typeFromJson)) {
                            head.put("desc", typeFromJson);
                            head.put("type", Types.ARRAY);
                        } else {
                            head.put("type", SqlUtil.encodeType(types.getString(columnName)));
                            head.put("desc", SqlUtil.changeType(types.getString(columnName)));
                        }
                        if (null != semantics) {
                            head.put("semantic", semantics.getString(meta.getColumnName(i)));
                        }
                        heads.add(head);
                        colNames.add(meta.getColumnName(i));
                    }
                }
            } catch (Exception e) {
                for (int i = 1; i < colCount + 1; i++) {
                    if (DatasetConstant.DEFAULT_ID_FIELD.equals(meta.getColumnName(i))) {
                        colNames.add(meta.getColumnName(i));
                        continue;
                    }
                    if (meta.getColumnName(i).contains(ColumnConstant.ROW_NUM)) {
                        continue;
                    }
                    JSONObject head = new JSONObject();
                    head.put("name", meta.getColumnName(i));
                    head.put("type", meta.getColumnType(i));
                    head.put("desc", SqlUtil.changeType(meta.getColumnTypeName(i)));
                    heads.add(head);
                    colNames.add(meta.getColumnName(i));
                }
            }
            JSONArray ja = new JSONArray();
            /* 生成data结构 */
            long fake_id = 1l;
            while (rs.next()) {
                JSONObject column = new JSONObject();
                for (String colName : colNames) {
                    column.put(colName, rs.getString(colName));
                }
                // TSNE flag
                ja.add(column);
            }
            result.put("head", heads);
            result.put("data", ja);
            result.put("page", page);
        } catch (Exception e) {
            logger.error("TaskInstanceService queryDetaById error,column={}, errMsg={}",
                    tableName, e.getMessage());
        } finally {
            JDBCUtil.close(conn, null, null);
        }

        return result;

    }
}
